 1.Kafka源码剖析之Kafka源码剖析之源码阅读环境搭建
   
   首先下载源码：http://archive.apache.org/dist/kafka/1.0.2/kafka-1.0.2-src.tgz
   gradle-4.8.1 下载地址：https://services.gradle.org/distributions/gradle-4.8.1-bin.zip
   Scala-2.12.12 下载地址：https://downloads.lightbend.com/scala/2.12.12/scala-2.12.12.msi
   1).安装配置Gradle
   解压gradle4.8.-bin.zip到一个目录
   配置环境变量，其中GRADLE_HOME指向gradle解压到的根目录，GRADLE_USER_HOME指向gradle的本地仓库位置。
   GRADLE_USER_HOME D:\gradleRepo
   PATH环境变量
   %GRADLE_HOME%\bin
   进入GRADLE_USER_HOME目录，添加init.gradle，配置gradle的源：
   init.gradle内容：
allprojects {
	repositories {
		maven { url 'https://maven.aliyun.com/repository/public/' }
        maven { url 'https://maven.aliyun.com/nexus/content/repositories/google' }
        maven { url 'https://maven.aliyun.com/nexus/content/groups/public/' }
        maven { url 'https://maven.aliyun.com/nexus/content/repositories/jcenter'}
        
		all { ArtifactRepository repo ->
		    if (repo instanceof MavenArtifactRepository) {
				def url = repo.url.toString()
				if (url.startsWith('https://repo.maven.apache.org/maven2/') ||
url.startsWith('https://repo.maven.org/maven2') ||
url.startsWith('https://repo1.maven.org/maven2') ||
url.startsWith('https://jcenter.bintray.com/')) {
	                 //project.logger.lifecycle "Repository ${repo.url} replaced by $REPOSITORY_URL."
					 remove repo
                }
           }
	  }
    }
	
	buildscript {
		repositories {
			maven { url 'https://maven.aliyun.com/repository/public/'}
            maven { url 'https://maven.aliyun.com/nexus/content/repositories/google'}
            maven { url 'https://maven.aliyun.com/nexus/content/groups/public/' }
            maven { url 'https://maven.aliyun.com/nexus/content/repositories/jcenter'}
            
			all { ArtifactRepository repo ->
			    if (repo instanceof MavenArtifactRepository) {
					def url = repo.url.toString()
                    if (url.startsWith('https://repo1.maven.org/maven2') ||
url.startsWith('https://jcenter.bintray.com/')) {
	                   //project.logger.lifecycle "Repository ${repo.url} replaced by $REPOSITORY_URL."
                       remove repo
                    }
				}		
		    }		
		}	
	}
}
    保存并退出，打开cmd，运行：	
	设置成功。
	2).Scala的安装和配置
	双击安装
	安装路径 D:\RunningApps\scala\
	新建系统变量
	SCALA_HOME   D:\RunningApps\scala
	添加gradle的bin目录到PATH中。
	%SCALA_HOME%\bin
	打开cmd，输入scala 验证：
	输入:quit退出Scala的交互式环境。
	3).Idea配置
	idea安装Scala插件
	Plugins中Marketplace 中installed
	4).源码操作
	解压源码
	打开CMD，进入kafka-1.0.2-src目录，执行：gradle
	结束后，执行gradle idea（注意不要使用生成的gradlew.bat执行操作）
	idea导入源码
	Import Project中选择 Gradle
	
 2.Kafka源码剖析之Broker启动流程
   
   1).启动kafka
   命令如下： kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties 。
   kafka-server-start.sh内容如下：
if [ $# -lt 1 ];
then
    echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
    exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-
Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
   *)
    ;;
esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
   2).查看Kafka.Kafka源码
def main(args: Array[String]): Unit = {
	try {
	  // 读取启动配置
      val serverProps = getPropsFromArgs(args)
      // 封装KafkaServer
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
      // register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c)
      registerLoggingSignalHandler()
	  // attach shutdown handler to catch terminating signals as well as normal termination
      // 增加回调监听
	  Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
        override def run(): Unit = kafkaServerStartable.shutdown()
      })
      // 启动服务
      kafkaServerStartable.startup()
      // 等待
      kafkaServerStartable.awaitShutdown()
	  	
	} catch {
		case e: Throwable =>
          fatal(e)
          Exit.exit(1)		
	}
	Exit.exit(0)
}
   上面的kafkaServerStartabl 封装了KafkaServer ，最终执行startup 的是KafkaServer 
class KafkaServerStartable(val serverConfig: KafkaConfig, reporters:
Seq[KafkaMetricsReporter]) extends Logging {
	private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)

    def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
	
	// 启动
    def startup() {
	  try server.startup()
      catch {
        case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke
        //`exit` to set the status code	
          fatal("Exiting Kafka.")
          Exit.exit(1)
      }
	}
	
	// 关闭
    def shutdown() {
      try server.shutdown()
      catch {
        case _: Throwable =>
          fatal("Halting Kafka.")
          Exit.halt(1)
      }
    }
	def setServerState(newState: Byte) {
      server.brokerState.newState(newState)
    }
	def awaitShutdown(): Unit = server.awaitShutdown()
}

   下面来看一下KafkaServe r的startup 方法，启动了很多东西，后面都会用到，代码中也加入了注释
def startup() {
	try {
    info("starting")
     // 是否关闭
    if (isShuttingDown.get)
		throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
	// 是否已启动完成
    if (startupComplete.get)
      return
    // 开始启动，并设置已启动变量
    val canStartup = isStartingUp.compareAndSet(false, true)
    if (canStartup) {
		// 设置broker为启动状态
        brokerState.newState(Starting)
        /* start scheduler */
        // 启动定时器
        kafkaScheduler.startup()
        /* setup zookeeper */
        // 初始化zookeeper配置
        zkUtils = initZk()
        /* Get or create cluster_id */
        // 在zookeeper上生成集群Id
        _clusterId = getOrGenerateClusterId(zkUtils)
        info(s"Cluster ID = $clusterId")
        /* generate brokerId */
        // 从配置文件获取brokerId
        val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
        config.brokerId = brokerId
        // 日志上下文
        logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
        this.logIdent = logContext.logPrefix
   	    
		/* create and configure metrics */
        // 通过配置文件中的MetricsReporter的实现类来创建实例
        val reporters =
config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp,
classOf[MetricsReporter],
           Map[String, AnyRef](KafkaConfig.BrokerIdProp ->
(config.brokerId.toString)).asJava)
        // 默认监控会增加jmx
        reporters.add(new JmxReporter(jmxPrefix))
        val metricConfig = KafkaServer.metricConfig(config)
        // 创建metric对象
        metrics = new Metrics(metricConfig, reporters, time, true)
        
		/* register broker metrics */
		_brokerTopicStats = new BrokerTopicStats
		// 初始化配额管理服务,对于每个producer或者consumer,可以对他们produce或者consum的速度
		// 上限作出限制
		quotaManagers = QuotaFactory.instantiate(config, metrics, time,
threadNamePrefix.getOrElse(""))
        // 增加监听器
        notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
        logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
        // 创建日志管理组件，创建时会检查log⽬录下是否有.kafka_cleanshutdown⽂件，如果没有的
        // 话，broker进行RecoveringFrom UncleanShutdown 状态
        /* start log manager */
        logManager = LogManager(config, initialOfflineDirs, zkUtils, brokerState,
kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
        logManager.startup()
         // 创建元数据管理组件
         metadataCache = new MetadataCache(config.brokerId)
         // 创建凭证提供者组件
         credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
		 // Create and start the socket server acceptor threads so that the bound port is known.

         // Delay starting processors until the end of the initialization sequence to ensure
         // that credentials have been loaded before processing authentications.
         // 创建一个sockerServer组件，并启动。该组件启动后，就会开始接收请求
		 socketServer = new SocketServer(config, metrics, time, credentialProvider)
         socketServer.startup(startupProcessors = false)
         // 创建一个副本管理组件，并启动该组件
         /* start replica manager */
         replicaManager = createReplicaManager(isShuttingDown)
         replicaManager.startup()
         // 创建kafka控制器,并启动。该控制器启动后broker会尝试去zk创建节点竞争成为controller
         /* start kafka controller */
         kafkaController = new KafkaController(config, zkUtils, time, metrics,
threadNamePrefix)
         kafkaController.startup()
         // 创建一个集群管理组件
         adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
         // 创建群组协调器,并且启动
         /* start group coordinator */
         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it
         // would be good to fix the underlying issue
         groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager,
Time.SYSTEM)
         groupCoordinator.startup()
		 // 启动事务协调器，带有单独的后台线程调度程序，用于事务到期和日志加载
         /* start transaction coordinator, with a separate background thread
         scheduler for transaction expiration and log loading */
         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it
         // would be good to fix the underlying issue
         transactionCoordinator = TransactionCoordinator(config, replicaManager, new
KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkUtils,
metrics, metadataCache, Time.SYSTEM)
         transactionCoordinator.startup()
		 
		 // 构造授权器
         /* Get the authorizer and initialize it if one is specified.*/
         authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map {
authorizerClassName =>
          val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
          authZ.configure(config.originals())
          authZ
         }
		 // 构造api组件，针对各个接⼝会处理不同的业务
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager,
adminManager, groupCoordinator, transactionCoordinator,
           kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics,
authorizer, quotaManagers,
           brokerTopicStats, clusterId, time)
         // 请求处理池
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
socketServer.requestChannel, apis, time,config.numIoThreads)
         Mx4jLoader.maybeLoad()
		 
		 // 动态配置处理器的相关配置
         /* start dynamic config manager */
         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new
TopicConfigHandler(logManager, config, quotaManagers),
           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
           ConfigType.User -> new UserConfigHandler(quotaManagers,
credentialProvider),
           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
         // 初始化动态配置管理器,并启动
         // Create the config manager. start listening to notifications
         dynamicConfigManager = new DynamicConfigManager(zkUtils,
dynamicConfigHandlers)
         dynamicConfigManager.startup()
		 
		 // 通知监听者
		 /* tell everyone we are alive */
		 val listeners = config.advertisedListeners.map { endpoint =>
           if (endpoint.port == 0)
              endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
           else
              endpoint
         }
		 // kafka健康检查组件
         kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils,
config.rack,config.interBrokerProtocolVersion)
         kafkaHealthcheck.startup()
		 
		 // 记录一下恢复点
         // Now that the broker id is successfully registered via KafkaHealthcheck,checkpoint it
         checkpointBrokerId(config.brokerId)
		 
		 // 修改broker状态
         socketServer.startProcessors()
         brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
         isStartingUp.set(false)
         AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
         info("started")
      }
   }
	  catch {
		 case e: Throwable =>
           fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
           isStartingUp.set(false)
           shutdown()
           throw e 
	  }
   }
	  
	
	